/*-------------------------------------------------------------------------
 *
 * execReplication.c
 *	  miscellaneous executor routines for logical replication
 *
 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 * IDENTIFICATION
 *	  src/backend/executor/execReplication.c
 *
 *-------------------------------------------------------------------------
 */

#include "postgres.h"

#include "access/commit_ts.h"
#include "access/genam.h"
#include "access/gist.h"
#include "access/relscan.h"
#include "access/tableam.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/heapam.h"
#include "catalog/pg_am_d.h"
#include "commands/trigger.h"
#include "executor/executor.h"
#include "executor/nodeModifyTable.h"
#include "replication/conflict.h"
#include "replication/logicalrelation.h"
#include "storage/lmgr.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
#include "utils/typcache.h"


static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
						 TypeCacheEntry **eq, Bitmapset *columns);

/*
 * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
 * is setup to match 'rel' (*NOT* idxrel!).
 *
 * Returns how many columns to use for the index scan.
 *
 * This is not generic routine, idxrel must be PK, RI, or an index that can be
 * used for REPLICA IDENTITY FULL table. See FindUsableIndexForReplicaIdentityFull()
 * for details.
 *
 * By definition, replication identity of a rel meets all limitations associated
 * with that. Note that any other index could also meet these limitations.
 */
static int
build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
						 TupleTableSlot *searchslot)
{
	int			index_attoff;
	int			skey_attoff = 0;
	Datum		indclassDatum;
	oidvector  *opclass;
	int2vector *indkey = &idxrel->rd_index->indkey;

	indclassDatum = SysCacheGetAttrNotNull(INDEXRELID, idxrel->rd_indextuple,
										   Anum_pg_index_indclass);
	opclass = (oidvector *) DatumGetPointer(indclassDatum);

	/* Build scankey for every non-expression attribute in the index. */
	for (index_attoff = 0; index_attoff < IndexRelationGetNumberOfKeyAttributes(idxrel);
		 index_attoff++)
	{
		Oid			operator;
		Oid			optype;
		Oid			opfamily;
		RegProcedure regop;
		int			table_attno = indkey->values[index_attoff];
		StrategyNumber eq_strategy;

		if (!AttributeNumberIsValid(table_attno))
		{
			/*
			 * XXX: Currently, we don't support expressions in the scan key,
			 * see code below.
			 */
			continue;
		}

		/*
		 * Load the operator info.  We need this to get the equality operator
		 * function for the scan key.
		 */
		optype = get_opclass_input_type(opclass->values[index_attoff]);
		opfamily = get_opclass_family(opclass->values[index_attoff]);
		eq_strategy = IndexAmTranslateCompareType(COMPARE_EQ, idxrel->rd_rel->relam, opfamily, false);
		operator = get_opfamily_member(opfamily, optype,
									   optype,
									   eq_strategy);

		if (!OidIsValid(operator))
			elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
				 eq_strategy, optype, optype, opfamily);

		regop = get_opcode(operator);

		/* Initialize the scankey. */
		ScanKeyInit(&skey[skey_attoff],
					index_attoff + 1,
					eq_strategy,
					regop,
					searchslot->tts_values[table_attno - 1]);

		skey[skey_attoff].sk_collation = idxrel->rd_indcollation[index_attoff];

		/* Check for null value. */
		if (searchslot->tts_isnull[table_attno - 1])
			skey[skey_attoff].sk_flags |= (SK_ISNULL | SK_SEARCHNULL);

		skey_attoff++;
	}

	/* There must always be at least one attribute for the index scan. */
	Assert(skey_attoff > 0);

	return skey_attoff;
}


/*
 * Helper function to check if it is necessary to re-fetch and lock the tuple
 * due to concurrent modifications. This function should be called after
 * invoking table_tuple_lock.
 */
static bool
should_refetch_tuple(TM_Result res, TM_FailureData *tmfd)
{
	bool		refetch = false;

	switch (res)
	{
		case TM_Ok:
			break;
		case TM_Updated:
			/* XXX: Improve handling here */
			if (ItemPointerIndicatesMovedPartitions(&tmfd->ctid))
				ereport(LOG,
						(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
						 errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
			else
				ereport(LOG,
						(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
						 errmsg("concurrent update, retrying")));
			refetch = true;
			break;
		case TM_Deleted:
			/* XXX: Improve handling here */
			ereport(LOG,
					(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
					 errmsg("concurrent delete, retrying")));
			refetch = true;
			break;
		case TM_Invisible:
			elog(ERROR, "attempted to lock invisible tuple");
			break;
		default:
			elog(ERROR, "unexpected table_tuple_lock status: %u", res);
			break;
	}

	return refetch;
}

/*
 * Search the relation 'rel' for tuple using the index.
 *
 * If a matching tuple is found, lock it with lockmode, fill the slot with its
 * contents, and return true.  Return false otherwise.
 */
bool
RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
							 LockTupleMode lockmode,
							 TupleTableSlot *searchslot,
							 TupleTableSlot *outslot)
{
	ScanKeyData skey[INDEX_MAX_KEYS];
	int			skey_attoff;
	IndexScanDesc scan;
	SnapshotData snap;
	TransactionId xwait;
	Relation	idxrel;
	bool		found;
	TypeCacheEntry **eq = NULL;
	bool		isIdxSafeToSkipDuplicates;

	/* Open the index. */
	idxrel = index_open(idxoid, RowExclusiveLock);

	isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);

	InitDirtySnapshot(snap);

	/* Build scan key. */
	skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);

	/* Start an index scan. */
	scan = index_beginscan(rel, idxrel, &snap, NULL, skey_attoff, 0);

retry:
	found = false;

	index_rescan(scan, skey, skey_attoff, NULL, 0);

	/* Try to find the tuple */
	while (index_getnext_slot(scan, ForwardScanDirection, outslot))
	{
		/*
		 * Avoid expensive equality check if the index is primary key or
		 * replica identity index.
		 */
		if (!isIdxSafeToSkipDuplicates)
		{
			if (eq == NULL)
				eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);

			if (!tuples_equal(outslot, searchslot, eq, NULL))
				continue;
		}

		ExecMaterializeSlot(outslot);

		xwait = TransactionIdIsValid(snap.xmin) ?
			snap.xmin : snap.xmax;

		/*
		 * If the tuple is locked, wait for locking transaction to finish and
		 * retry.
		 */
		if (TransactionIdIsValid(xwait))
		{
			XactLockTableWait(xwait, NULL, NULL, XLTW_None);
			goto retry;
		}

		/* Found our tuple and it's not locked */
		found = true;
		break;
	}

	/* Found tuple, try to lock it in the lockmode. */
	if (found)
	{
		TM_FailureData tmfd;
		TM_Result	res;

		PushActiveSnapshot(GetLatestSnapshot());

		res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
							   outslot,
							   GetCurrentCommandId(false),
							   lockmode,
							   LockWaitBlock,
							   0 /* don't follow updates */ ,
							   &tmfd);

		PopActiveSnapshot();

		if (should_refetch_tuple(res, &tmfd))
			goto retry;
	}

	index_endscan(scan);

	/* Don't release lock until commit. */
	index_close(idxrel, NoLock);

	return found;
}

/*
 * Compare the tuples in the slots by checking if they have equal values.
 *
 * If 'columns' is not null, only the columns specified within it will be
 * considered for the equality check, ignoring all other columns.
 */
static bool
tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
			 TypeCacheEntry **eq, Bitmapset *columns)
{
	int			attrnum;

	Assert(slot1->tts_tupleDescriptor->natts ==
		   slot2->tts_tupleDescriptor->natts);

	slot_getallattrs(slot1);
	slot_getallattrs(slot2);

	/* Check equality of the attributes. */
	for (attrnum = 0; attrnum < slot1->tts_tupleDescriptor->natts; attrnum++)
	{
		Form_pg_attribute att;
		TypeCacheEntry *typentry;

		att = TupleDescAttr(slot1->tts_tupleDescriptor, attrnum);

		/*
		 * Ignore dropped and generated columns as the publisher doesn't send
		 * those
		 */
		if (att->attisdropped || att->attgenerated)
			continue;

		/*
		 * Ignore columns that are not listed for checking.
		 */
		if (columns &&
			!bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
						   columns))
			continue;

		/*
		 * If one value is NULL and other is not, then they are certainly not
		 * equal
		 */
		if (slot1->tts_isnull[attrnum] != slot2->tts_isnull[attrnum])
			return false;

		/*
		 * If both are NULL, they can be considered equal.
		 */
		if (slot1->tts_isnull[attrnum] || slot2->tts_isnull[attrnum])
			continue;

		typentry = eq[attrnum];
		if (typentry == NULL)
		{
			typentry = lookup_type_cache(att->atttypid,
										 TYPECACHE_EQ_OPR_FINFO);
			if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
				ereport(ERROR,
						(errcode(ERRCODE_UNDEFINED_FUNCTION),
						 errmsg("could not identify an equality operator for type %s",
								format_type_be(att->atttypid))));
			eq[attrnum] = typentry;
		}

		if (!DatumGetBool(FunctionCall2Coll(&typentry->eq_opr_finfo,
											att->attcollation,
											slot1->tts_values[attrnum],
											slot2->tts_values[attrnum])))
			return false;
	}

	return true;
}

/*
 * Search the relation 'rel' for tuple using the sequential scan.
 *
 * If a matching tuple is found, lock it with lockmode, fill the slot with its
 * contents, and return true.  Return false otherwise.
 *
 * Note that this stops on the first matching tuple.
 *
 * This can obviously be quite slow on tables that have more than few rows.
 */
bool
RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
						 TupleTableSlot *searchslot, TupleTableSlot *outslot)
{
	TupleTableSlot *scanslot;
	TableScanDesc scan;
	SnapshotData snap;
	TypeCacheEntry **eq;
	TransactionId xwait;
	bool		found;
	TupleDesc	desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);

	Assert(equalTupleDescs(desc, outslot->tts_tupleDescriptor));

	eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);

	/* Start a heap scan. */
	InitDirtySnapshot(snap);
	scan = table_beginscan(rel, &snap, 0, NULL);
	scanslot = table_slot_create(rel, NULL);

retry:
	found = false;

	table_rescan(scan, NULL);

	/* Try to find the tuple */
	while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
	{
		if (!tuples_equal(scanslot, searchslot, eq, NULL))
			continue;

		found = true;
		ExecCopySlot(outslot, scanslot);

		xwait = TransactionIdIsValid(snap.xmin) ?
			snap.xmin : snap.xmax;

		/*
		 * If the tuple is locked, wait for locking transaction to finish and
		 * retry.
		 */
		if (TransactionIdIsValid(xwait))
		{
			XactLockTableWait(xwait, NULL, NULL, XLTW_None);
			goto retry;
		}

		/* Found our tuple and it's not locked */
		break;
	}

	/* Found tuple, try to lock it in the lockmode. */
	if (found)
	{
		TM_FailureData tmfd;
		TM_Result	res;

		PushActiveSnapshot(GetLatestSnapshot());

		res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
							   outslot,
							   GetCurrentCommandId(false),
							   lockmode,
							   LockWaitBlock,
							   0 /* don't follow updates */ ,
							   &tmfd);

		PopActiveSnapshot();

		if (should_refetch_tuple(res, &tmfd))
			goto retry;
	}

	table_endscan(scan);
	ExecDropSingleTupleTableSlot(scanslot);

	return found;
}

/*
 * Build additional index information necessary for conflict detection.
 */
static void
BuildConflictIndexInfo(ResultRelInfo *resultRelInfo, Oid conflictindex)
{
	for (int i = 0; i < resultRelInfo->ri_NumIndices; i++)
	{
		Relation	indexRelation = resultRelInfo->ri_IndexRelationDescs[i];
		IndexInfo  *indexRelationInfo = resultRelInfo->ri_IndexRelationInfo[i];

		if (conflictindex != RelationGetRelid(indexRelation))
			continue;

		/*
		 * This Assert will fail if BuildSpeculativeIndexInfo() is called
		 * twice for the given index.
		 */
		Assert(indexRelationInfo->ii_UniqueOps == NULL);

		BuildSpeculativeIndexInfo(indexRelation, indexRelationInfo);
	}
}

/*
 * If the tuple is recently dead and was deleted by a transaction with a newer
 * commit timestamp than previously recorded, update the associated transaction
 * ID, commit time, and origin. This helps ensure that conflict detection uses
 * the most recent and relevant deletion metadata.
 */
static void
update_most_recent_deletion_info(TupleTableSlot *scanslot,
								 TransactionId oldestxmin,
								 TransactionId *delete_xid,
								 TimestampTz *delete_time,
								 RepOriginId *delete_origin)
{
	BufferHeapTupleTableSlot *hslot;
	HeapTuple	tuple;
	Buffer		buf;
	bool		recently_dead = false;
	TransactionId xmax;
	TimestampTz localts;
	RepOriginId localorigin;

	hslot = (BufferHeapTupleTableSlot *) scanslot;

	tuple = ExecFetchSlotHeapTuple(scanslot, false, NULL);
	buf = hslot->buffer;

	LockBuffer(buf, BUFFER_LOCK_SHARE);

	/*
	 * We do not consider HEAPTUPLE_DEAD status because it indicates either
	 * tuples whose inserting transaction was aborted (meaning there is no
	 * commit timestamp or origin), or tuples deleted by a transaction older
	 * than oldestxmin, making it safe to ignore them during conflict
	 * detection (See comments atop worker.c for details).
	 */
	if (HeapTupleSatisfiesVacuum(tuple, oldestxmin, buf) == HEAPTUPLE_RECENTLY_DEAD)
		recently_dead = true;

	LockBuffer(buf, BUFFER_LOCK_UNLOCK);

	if (!recently_dead)
		return;

	xmax = HeapTupleHeaderGetUpdateXid(tuple->t_data);
	if (!TransactionIdIsValid(xmax))
		return;

	/* Select the dead tuple with the most recent commit timestamp */
	if (TransactionIdGetCommitTsData(xmax, &localts, &localorigin) &&
		TimestampDifferenceExceeds(*delete_time, localts, 0))
	{
		*delete_xid = xmax;
		*delete_time = localts;
		*delete_origin = localorigin;
	}
}

/*
 * Searches the relation 'rel' for the most recently deleted tuple that matches
 * the values in 'searchslot' and is not yet removable by VACUUM. The function
 * returns the transaction ID, origin, and commit timestamp of the transaction
 * that deleted this tuple.
 *
 * 'oldestxmin' acts as a cutoff transaction ID. Tuples deleted by transactions
 * with IDs >= 'oldestxmin' are considered recently dead and are eligible for
 * conflict detection.
 *
 * Instead of stopping at the first match, we scan all matching dead tuples to
 * identify most recent deletion. This is crucial because only the latest
 * deletion is relevant for resolving conflicts.
 *
 * For example, consider a scenario on the subscriber where a row is deleted,
 * re-inserted, and then deleted again only on the subscriber:
 *
 *   - (pk, 1) - deleted at 9:00,
 *   - (pk, 1) - deleted at 9:02,
 *
 * Now, a remote update arrives: (pk, 1) -> (pk, 2), timestamped at 9:01.
 *
 * If we mistakenly return the older deletion (9:00), the system may wrongly
 * apply the remote update using a last-update-wins strategy. Instead, we must
 * recognize the more recent deletion at 9:02 and skip the update. See
 * comments atop worker.c for details. Note, as of now, conflict resolution
 * is not implemented. Consequently, the system may incorrectly report the
 * older tuple as the conflicted one, leading to misleading results.
 *
 * The commit timestamp of the deleting transaction is used to determine which
 * tuple was deleted most recently.
 */
bool
RelationFindDeletedTupleInfoSeq(Relation rel, TupleTableSlot *searchslot,
								TransactionId oldestxmin,
								TransactionId *delete_xid,
								RepOriginId *delete_origin,
								TimestampTz *delete_time)
{
	TupleTableSlot *scanslot;
	TableScanDesc scan;
	TypeCacheEntry **eq;
	Bitmapset  *indexbitmap;
	TupleDesc	desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);

	Assert(equalTupleDescs(desc, searchslot->tts_tupleDescriptor));

	*delete_xid = InvalidTransactionId;
	*delete_origin = InvalidRepOriginId;
	*delete_time = 0;

	/*
	 * If the relation has a replica identity key or a primary key that is
	 * unusable for locating deleted tuples (see
	 * IsIndexUsableForFindingDeletedTuple), a full table scan becomes
	 * necessary. In such cases, comparing the entire tuple is not required,
	 * since the remote tuple might not include all column values. Instead,
	 * the indexed columns alone are sufficient to identify the target tuple
	 * (see logicalrep_rel_mark_updatable).
	 */
	indexbitmap = RelationGetIndexAttrBitmap(rel,
											 INDEX_ATTR_BITMAP_IDENTITY_KEY);

	/* fallback to PK if no replica identity */
	if (!indexbitmap)
		indexbitmap = RelationGetIndexAttrBitmap(rel,
												 INDEX_ATTR_BITMAP_PRIMARY_KEY);

	eq = palloc0(sizeof(*eq) * searchslot->tts_tupleDescriptor->natts);

	/*
	 * Start a heap scan using SnapshotAny to identify dead tuples that are
	 * not visible under a standard MVCC snapshot. Tuples from transactions
	 * not yet committed or those just committed prior to the scan are
	 * excluded in update_most_recent_deletion_info().
	 */
	scan = table_beginscan(rel, SnapshotAny, 0, NULL);
	scanslot = table_slot_create(rel, NULL);

	table_rescan(scan, NULL);

	/* Try to find the tuple */
	while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
	{
		if (!tuples_equal(scanslot, searchslot, eq, indexbitmap))
			continue;

		update_most_recent_deletion_info(scanslot, oldestxmin, delete_xid,
										 delete_time, delete_origin);
	}

	table_endscan(scan);
	ExecDropSingleTupleTableSlot(scanslot);

	return *delete_time != 0;
}

/*
 * Similar to RelationFindDeletedTupleInfoSeq() but using index scan to locate
 * the deleted tuple.
 */
bool
RelationFindDeletedTupleInfoByIndex(Relation rel, Oid idxoid,
									TupleTableSlot *searchslot,
									TransactionId oldestxmin,
									TransactionId *delete_xid,
									RepOriginId *delete_origin,
									TimestampTz *delete_time)
{
	Relation	idxrel;
	ScanKeyData skey[INDEX_MAX_KEYS];
	int			skey_attoff;
	IndexScanDesc scan;
	TupleTableSlot *scanslot;
	TypeCacheEntry **eq = NULL;
	bool		isIdxSafeToSkipDuplicates;
	TupleDesc	desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);

	Assert(equalTupleDescs(desc, searchslot->tts_tupleDescriptor));
	Assert(OidIsValid(idxoid));

	*delete_xid = InvalidTransactionId;
	*delete_time = 0;
	*delete_origin = InvalidRepOriginId;

	isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);

	scanslot = table_slot_create(rel, NULL);

	idxrel = index_open(idxoid, RowExclusiveLock);

	/* Build scan key. */
	skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);

	/*
	 * Start an index scan using SnapshotAny to identify dead tuples that are
	 * not visible under a standard MVCC snapshot. Tuples from transactions
	 * not yet committed or those just committed prior to the scan are
	 * excluded in update_most_recent_deletion_info().
	 */
	scan = index_beginscan(rel, idxrel, SnapshotAny, NULL, skey_attoff, 0);

	index_rescan(scan, skey, skey_attoff, NULL, 0);

	/* Try to find the tuple */
	while (index_getnext_slot(scan, ForwardScanDirection, scanslot))
	{
		/*
		 * Avoid expensive equality check if the index is primary key or
		 * replica identity index.
		 */
		if (!isIdxSafeToSkipDuplicates)
		{
			if (eq == NULL)
				eq = palloc0(sizeof(*eq) * scanslot->tts_tupleDescriptor->natts);

			if (!tuples_equal(scanslot, searchslot, eq, NULL))
				continue;
		}

		update_most_recent_deletion_info(scanslot, oldestxmin, delete_xid,
										 delete_time, delete_origin);
	}

	index_endscan(scan);

	index_close(idxrel, NoLock);

	ExecDropSingleTupleTableSlot(scanslot);

	return *delete_time != 0;
}

/*
 * Find the tuple that violates the passed unique index (conflictindex).
 *
 * If the conflicting tuple is found return true, otherwise false.
 *
 * We lock the tuple to avoid getting it deleted before the caller can fetch
 * the required information. Note that if the tuple is deleted before a lock
 * is acquired, we will retry to find the conflicting tuple again.
 */
static bool
FindConflictTuple(ResultRelInfo *resultRelInfo, EState *estate,
				  Oid conflictindex, TupleTableSlot *slot,
				  TupleTableSlot **conflictslot)
{
	Relation	rel = resultRelInfo->ri_RelationDesc;
	ItemPointerData conflictTid;
	TM_FailureData tmfd;
	TM_Result	res;

	*conflictslot = NULL;

	/*
	 * Build additional information required to check constraints violations.
	 * See check_exclusion_or_unique_constraint().
	 */
	BuildConflictIndexInfo(resultRelInfo, conflictindex);

retry:
	if (ExecCheckIndexConstraints(resultRelInfo, slot, estate,
								  &conflictTid, &slot->tts_tid,
								  list_make1_oid(conflictindex)))
	{
		if (*conflictslot)
			ExecDropSingleTupleTableSlot(*conflictslot);

		*conflictslot = NULL;
		return false;
	}

	*conflictslot = table_slot_create(rel, NULL);

	PushActiveSnapshot(GetLatestSnapshot());

	res = table_tuple_lock(rel, &conflictTid, GetActiveSnapshot(),
						   *conflictslot,
						   GetCurrentCommandId(false),
						   LockTupleShare,
						   LockWaitBlock,
						   0 /* don't follow updates */ ,
						   &tmfd);

	PopActiveSnapshot();

	if (should_refetch_tuple(res, &tmfd))
		goto retry;

	return true;
}

/*
 * Check all the unique indexes in 'recheckIndexes' for conflict with the
 * tuple in 'remoteslot' and report if found.
 */
static void
CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate,
					   ConflictType type, List *recheckIndexes,
					   TupleTableSlot *searchslot, TupleTableSlot *remoteslot)
{
	List	   *conflicttuples = NIL;
	TupleTableSlot *conflictslot;

	/* Check all the unique indexes for conflicts */
	foreach_oid(uniqueidx, resultRelInfo->ri_onConflictArbiterIndexes)
	{
		if (list_member_oid(recheckIndexes, uniqueidx) &&
			FindConflictTuple(resultRelInfo, estate, uniqueidx, remoteslot,
							  &conflictslot))
		{
			ConflictTupleInfo *conflicttuple = palloc0_object(ConflictTupleInfo);

			conflicttuple->slot = conflictslot;
			conflicttuple->indexoid = uniqueidx;

			GetTupleTransactionInfo(conflictslot, &conflicttuple->xmin,
									&conflicttuple->origin, &conflicttuple->ts);

			conflicttuples = lappend(conflicttuples, conflicttuple);
		}
	}

	/* Report the conflict, if found */
	if (conflicttuples)
		ReportApplyConflict(estate, resultRelInfo, ERROR,
							list_length(conflicttuples) > 1 ? CT_MULTIPLE_UNIQUE_CONFLICTS : type,
							searchslot, remoteslot, conflicttuples);
}

/*
 * Insert tuple represented in the slot to the relation, update the indexes,
 * and execute any constraints and per-row triggers.
 *
 * Caller is responsible for opening the indexes.
 */
void
ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
						 EState *estate, TupleTableSlot *slot)
{
	bool		skip_tuple = false;
	Relation	rel = resultRelInfo->ri_RelationDesc;

	/* For now we support only tables. */
	Assert(rel->rd_rel->relkind == RELKIND_RELATION);

	CheckCmdReplicaIdentity(rel, CMD_INSERT);

	/* BEFORE ROW INSERT Triggers */
	if (resultRelInfo->ri_TrigDesc &&
		resultRelInfo->ri_TrigDesc->trig_insert_before_row)
	{
		if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
			skip_tuple = true;	/* "do nothing" */
	}

	if (!skip_tuple)
	{
		List	   *recheckIndexes = NIL;
		List	   *conflictindexes;
		bool		conflict = false;

		/* Compute stored generated columns */
		if (rel->rd_att->constr &&
			rel->rd_att->constr->has_generated_stored)
			ExecComputeStoredGenerated(resultRelInfo, estate, slot,
									   CMD_INSERT);

		/* Check the constraints of the tuple */
		if (rel->rd_att->constr)
			ExecConstraints(resultRelInfo, slot, estate);
		if (rel->rd_rel->relispartition)
			ExecPartitionCheck(resultRelInfo, slot, estate, true);

		/* OK, store the tuple and create index entries for it */
		simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot);

		conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes;

		if (resultRelInfo->ri_NumIndices > 0)
			recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
												   slot, estate, false,
												   conflictindexes ? true : false,
												   &conflict,
												   conflictindexes, false);

		/*
		 * Checks the conflict indexes to fetch the conflicting local row and
		 * reports the conflict. We perform this check here, instead of
		 * performing an additional index scan before the actual insertion and
		 * reporting the conflict if any conflicting rows are found. This is
		 * to avoid the overhead of executing the extra scan for each INSERT
		 * operation, even when no conflict arises, which could introduce
		 * significant overhead to replication, particularly in cases where
		 * conflicts are rare.
		 *
		 * XXX OTOH, this could lead to clean-up effort for dead tuples added
		 * in heap and index in case of conflicts. But as conflicts shouldn't
		 * be a frequent thing so we preferred to save the performance
		 * overhead of extra scan before each insertion.
		 */
		if (conflict)
			CheckAndReportConflict(resultRelInfo, estate, CT_INSERT_EXISTS,
								   recheckIndexes, NULL, slot);

		/* AFTER ROW INSERT Triggers */
		ExecARInsertTriggers(estate, resultRelInfo, slot,
							 recheckIndexes, NULL);

		/*
		 * XXX we should in theory pass a TransitionCaptureState object to the
		 * above to capture transition tuples, but after statement triggers
		 * don't actually get fired by replication yet anyway
		 */

		list_free(recheckIndexes);
	}
}

/*
 * Find the searchslot tuple and update it with data in the slot,
 * update the indexes, and execute any constraints and per-row triggers.
 *
 * Caller is responsible for opening the indexes.
 */
void
ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
						 EState *estate, EPQState *epqstate,
						 TupleTableSlot *searchslot, TupleTableSlot *slot)
{
	bool		skip_tuple = false;
	Relation	rel = resultRelInfo->ri_RelationDesc;
	ItemPointer tid = &(searchslot->tts_tid);

	/*
	 * We support only non-system tables, with
	 * check_publication_add_relation() accountable.
	 */
	Assert(rel->rd_rel->relkind == RELKIND_RELATION);
	Assert(!IsCatalogRelation(rel));

	CheckCmdReplicaIdentity(rel, CMD_UPDATE);

	/* BEFORE ROW UPDATE Triggers */
	if (resultRelInfo->ri_TrigDesc &&
		resultRelInfo->ri_TrigDesc->trig_update_before_row)
	{
		if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
								  tid, NULL, slot, NULL, NULL, false))
			skip_tuple = true;	/* "do nothing" */
	}

	if (!skip_tuple)
	{
		List	   *recheckIndexes = NIL;
		TU_UpdateIndexes update_indexes;
		List	   *conflictindexes;
		bool		conflict = false;

		/* Compute stored generated columns */
		if (rel->rd_att->constr &&
			rel->rd_att->constr->has_generated_stored)
			ExecComputeStoredGenerated(resultRelInfo, estate, slot,
									   CMD_UPDATE);

		/* Check the constraints of the tuple */
		if (rel->rd_att->constr)
			ExecConstraints(resultRelInfo, slot, estate);
		if (rel->rd_rel->relispartition)
			ExecPartitionCheck(resultRelInfo, slot, estate, true);

		simple_table_tuple_update(rel, tid, slot, estate->es_snapshot,
								  &update_indexes);

		conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes;

		if (resultRelInfo->ri_NumIndices > 0 && (update_indexes != TU_None))
			recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
												   slot, estate, true,
												   conflictindexes ? true : false,
												   &conflict, conflictindexes,
												   (update_indexes == TU_Summarizing));

		/*
		 * Refer to the comments above the call to CheckAndReportConflict() in
		 * ExecSimpleRelationInsert to understand why this check is done at
		 * this point.
		 */
		if (conflict)
			CheckAndReportConflict(resultRelInfo, estate, CT_UPDATE_EXISTS,
								   recheckIndexes, searchslot, slot);

		/* AFTER ROW UPDATE Triggers */
		ExecARUpdateTriggers(estate, resultRelInfo,
							 NULL, NULL,
							 tid, NULL, slot,
							 recheckIndexes, NULL, false);

		list_free(recheckIndexes);
	}
}

/*
 * Find the searchslot tuple and delete it, and execute any constraints
 * and per-row triggers.
 *
 * Caller is responsible for opening the indexes.
 */
void
ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo,
						 EState *estate, EPQState *epqstate,
						 TupleTableSlot *searchslot)
{
	bool		skip_tuple = false;
	Relation	rel = resultRelInfo->ri_RelationDesc;
	ItemPointer tid = &searchslot->tts_tid;

	CheckCmdReplicaIdentity(rel, CMD_DELETE);

	/* BEFORE ROW DELETE Triggers */
	if (resultRelInfo->ri_TrigDesc &&
		resultRelInfo->ri_TrigDesc->trig_delete_before_row)
	{
		skip_tuple = !ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
										   tid, NULL, NULL, NULL, NULL, false);
	}

	if (!skip_tuple)
	{
		/* OK, delete the tuple */
		simple_table_tuple_delete(rel, tid, estate->es_snapshot);

		/* AFTER ROW DELETE Triggers */
		ExecARDeleteTriggers(estate, resultRelInfo,
							 tid, NULL, NULL, false);
	}
}

/*
 * Check if command can be executed with current replica identity.
 */
void
CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
{
	PublicationDesc pubdesc;

	/*
	 * Skip checking the replica identity for partitioned tables, because the
	 * operations are actually performed on the leaf partitions.
	 */
	if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
		return;

	/* We only need to do checks for UPDATE and DELETE. */
	if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
		return;

	/*
	 * It is only safe to execute UPDATE/DELETE if the relation does not
	 * publish UPDATEs or DELETEs, or all the following conditions are
	 * satisfied:
	 *
	 * 1. All columns, referenced in the row filters from publications which
	 * the relation is in, are valid - i.e. when all referenced columns are
	 * part of REPLICA IDENTITY.
	 *
	 * 2. All columns, referenced in the column lists are valid - i.e. when
	 * all columns referenced in the REPLICA IDENTITY are covered by the
	 * column list.
	 *
	 * 3. All generated columns in REPLICA IDENTITY of the relation, are valid
	 * - i.e. when all these generated columns are published.
	 *
	 * XXX We could optimize it by first checking whether any of the
	 * publications have a row filter or column list for this relation, or if
	 * the relation contains a generated column. If none of these exist and
	 * the relation has replica identity then we can avoid building the
	 * descriptor but as this happens only one time it doesn't seem worth the
	 * additional complexity.
	 */
	RelationBuildPublicationDesc(rel, &pubdesc);
	if (cmd == CMD_UPDATE && !pubdesc.rf_valid_for_update)
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
				 errmsg("cannot update table \"%s\"",
						RelationGetRelationName(rel)),
				 errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
	else if (cmd == CMD_UPDATE && !pubdesc.cols_valid_for_update)
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
				 errmsg("cannot update table \"%s\"",
						RelationGetRelationName(rel)),
				 errdetail("Column list used by the publication does not cover the replica identity.")));
	else if (cmd == CMD_UPDATE && !pubdesc.gencols_valid_for_update)
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
				 errmsg("cannot update table \"%s\"",
						RelationGetRelationName(rel)),
				 errdetail("Replica identity must not contain unpublished generated columns.")));
	else if (cmd == CMD_DELETE && !pubdesc.rf_valid_for_delete)
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
				 errmsg("cannot delete from table \"%s\"",
						RelationGetRelationName(rel)),
				 errdetail("Column used in the publication WHERE expression is not part of the replica identity.")));
	else if (cmd == CMD_DELETE && !pubdesc.cols_valid_for_delete)
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
				 errmsg("cannot delete from table \"%s\"",
						RelationGetRelationName(rel)),
				 errdetail("Column list used by the publication does not cover the replica identity.")));
	else if (cmd == CMD_DELETE && !pubdesc.gencols_valid_for_delete)
		ereport(ERROR,
				(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
				 errmsg("cannot delete from table \"%s\"",
						RelationGetRelationName(rel)),
				 errdetail("Replica identity must not contain unpublished generated columns.")));

	/* If relation has replica identity we are always good. */
	if (OidIsValid(RelationGetReplicaIndex(rel)))
		return;

	/* REPLICA IDENTITY FULL is also good for UPDATE/DELETE. */
	if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
		return;

	/*
	 * This is UPDATE/DELETE and there is no replica identity.
	 *
	 * Check if the table publishes UPDATES or DELETES.
	 */
	if (cmd == CMD_UPDATE && pubdesc.pubactions.pubupdate)
		ereport(ERROR,
				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
				 errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",
						RelationGetRelationName(rel)),
				 errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));
	else if (cmd == CMD_DELETE && pubdesc.pubactions.pubdelete)
		ereport(ERROR,
				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
				 errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",
						RelationGetRelationName(rel)),
				 errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
}


/*
 * Check if we support writing into specific relkind of local relation and check
 * if it aligns with the relkind of the relation on the publisher.
 *
 * The nspname and relname are only needed for error reporting.
 */
void
CheckSubscriptionRelkind(char localrelkind, char remoterelkind,
						 const char *nspname, const char *relname)
{
	if (localrelkind != RELKIND_RELATION &&
		localrelkind != RELKIND_PARTITIONED_TABLE &&
		localrelkind != RELKIND_SEQUENCE)
		ereport(ERROR,
				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
				 errmsg("cannot use relation \"%s.%s\" as logical replication target",
						nspname, relname),
				 errdetail_relkind_not_supported(localrelkind)));

	/*
	 * Allow RELKIND_RELATION and RELKIND_PARTITIONED_TABLE to be treated
	 * interchangeably, but ensure that sequences (RELKIND_SEQUENCE) match
	 * exactly on both publisher and subscriber.
	 */
	if ((localrelkind == RELKIND_SEQUENCE && remoterelkind != RELKIND_SEQUENCE) ||
		(localrelkind != RELKIND_SEQUENCE && remoterelkind == RELKIND_SEQUENCE))
		ereport(ERROR,
				errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
		/* translator: 3rd and 4th %s are "sequence" or "table" */
				errmsg("relation \"%s.%s\" type mismatch: source \"%s\", target \"%s\"",
					   nspname, relname,
					   remoterelkind == RELKIND_SEQUENCE ? "sequence" : "table",
					   localrelkind == RELKIND_SEQUENCE ? "sequence" : "table"));
}
